package com.atguigu.app;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.atguigu.constants.GmallConstants;
import com.atguigu.util.MyKafkaSender;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.Random;

public class CanalClient {
    public static void main(String[] args) throws InvalidProtocolBufferException {
        //1.获取连接对象
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop102", 11111), "example", "", "");

        while (true){
            //2.获取连接
            canalConnector.connect();

            //3.选择订阅的数据库
            canalConnector.subscribe("gmall211126.*");

            //4.获取多个sql执行的结果
            Message message = canalConnector.get(100);

            //5.获取一个sql执行的结果
            List<CanalEntry.Entry> entries = message.getEntries();

            //判断是否有数据生成，没有的话休息一会
            if (entries.size()<=0){
                //没有
                try {
                    Thread.sleep(5000);
                    System.out.println("没有数据，休息一会！！！");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }else {
                //有数据
                for (CanalEntry.Entry entry : entries) {
                    //TODO 6.获取表名
                    String tableName = entry.getHeader().getTableName();

                    //7.根据Entry类型判断获取序列化数据
                    CanalEntry.EntryType entryType = entry.getEntryType();

                    if (CanalEntry.EntryType.ROWDATA.equals(entryType)){
                        //8.获取序列化数据
                        ByteString storeValue = entry.getStoreValue();

                        //9.对数据做反序列化操作
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);

                        //TODO 10.获取事件类型
                        CanalEntry.EventType eventType = rowChange.getEventType();

                        //TODO 11.获取多行数据
                        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();

                        //根据不同的表名和事件类型获取对应的数据
                        handle(tableName, eventType, rowDatasList);
                    }
                }
            }


        }
    }

    private static void handle(String tableName, CanalEntry.EventType eventType, List<CanalEntry.RowData> rowDatasList) {
        //获取订单表中新增的数据
        if ("order_info".equals(tableName) && CanalEntry.EventType.INSERT.equals(eventType)) {
            saveToKafka(rowDatasList,GmallConstants.KAFKA_TOPIC_ORDER);
        } else if ("order_detail".equals(tableName) && CanalEntry.EventType.INSERT.equals(eventType)) {
            saveToKafka(rowDatasList,GmallConstants.KAFKA_TOPIC_ORDER_DETAIL);
            //获取用户表新增及变化的数据
        } else if ("user_info".equals(tableName) && (CanalEntry.EventType.INSERT.equals(eventType) || CanalEntry.EventType.UPDATE.equals(eventType))) {
            saveToKafka(rowDatasList,GmallConstants.KAFKA_TOPIC_USER);
        }
    }

    private static void saveToKafka(List<CanalEntry.RowData> rowDatasList,String kafkaTopic) {
        for (CanalEntry.RowData rowData : rowDatasList) {
            //获取每一行的多列数据
            JSONObject jsonObject = new JSONObject();
            for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
                jsonObject.put(column.getName(), column.getValue());
            }
            System.out.println(jsonObject.toJSONString());
            //模拟网络延迟，随机延迟0-5秒
            /*try {
                Thread.sleep(new Random().nextInt(5000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }*/
            MyKafkaSender.send(kafkaTopic, jsonObject.toJSONString());
        }
    }
}
